{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Introducing ML package of PySpark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Predict chances of infant survival with ML"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Load the data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, we load the data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import pyspark.sql.types as typ\n",
    "\n",
    "labels = [\n",
    "    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),\n",
    "    ('BIRTH_PLACE', typ.StringType()),\n",
    "    ('MOTHER_AGE_YEARS', typ.IntegerType()),\n",
    "    ('FATHER_COMBINED_AGE', typ.IntegerType()),\n",
    "    ('CIG_BEFORE', typ.IntegerType()),\n",
    "    ('CIG_1_TRI', typ.IntegerType()),\n",
    "    ('CIG_2_TRI', typ.IntegerType()),\n",
    "    ('CIG_3_TRI', typ.IntegerType()),\n",
    "    ('MOTHER_HEIGHT_IN', typ.IntegerType()),\n",
    "    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),\n",
    "    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),\n",
    "    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),\n",
    "    ('DIABETES_PRE', typ.IntegerType()),\n",
    "    ('DIABETES_GEST', typ.IntegerType()),\n",
    "    ('HYP_TENS_PRE', typ.IntegerType()),\n",
    "    ('HYP_TENS_GEST', typ.IntegerType()),\n",
    "    ('PREV_BIRTH_PRETERM', typ.IntegerType())\n",
    "]\n",
    "\n",
    "schema = typ.StructType([\n",
    "    typ.StructField(e[0], e[1], False) for e in labels\n",
    "])\n",
    "\n",
    "births = spark.read.csv('births_transformed.csv.gz', \n",
    "                        header=True, \n",
    "                        schema=schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create transformers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pyspark.ml.feature as ft\n",
    "\n",
    "births = births \\\n",
    "    .withColumn(       'BIRTH_PLACE_INT', \n",
    "                births['BIRTH_PLACE'] \\\n",
    "                    .cast(typ.IntegerType()))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Having done this, we can now create our first `Transformer`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "encoder = ft.OneHotEncoder(\n",
    "    inputCol='BIRTH_PLACE_INT', \n",
    "    outputCol='BIRTH_PLACE_VEC')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's now create a single column with all the features collated together. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "featuresCreator = ft.VectorAssembler(\n",
    "    inputCols=[\n",
    "        col[0] \n",
    "        for col \n",
    "        in labels[2:]] + \\\n",
    "    [encoder.getOutputCol()], \n",
    "    outputCol='features'\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create an estimator"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this example we will (once again) us the Logistic Regression model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pyspark.ml.classification as cl"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Once loaded, let's create the model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "logistic = cl.LogisticRegression(\n",
    "    maxIter=10, \n",
    "    regParam=0.01, \n",
    "    labelCol='INFANT_ALIVE_AT_REPORT')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create a pipeline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "All that is left now is to creat a `Pipeline` and fit the model. First, let's load the `Pipeline` from the package."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "\n",
    "pipeline = Pipeline(stages=[\n",
    "        encoder, \n",
    "        featuresCreator, \n",
    "        logistic\n",
    "    ])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Fit the model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Conventiently, `DataFrame` API has the `.randomSplit(...)` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "births_train, births_test = births \\\n",
    "    .randomSplit([0.7, 0.3], seed=666)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now run our `pipeline` and estimate our model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "model = pipeline.fit(births_train)\n",
    "test_model = model.transform(births_test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here's what the `test_model` looks like."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "test_model.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Model performance"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Obviously, we would like to now test how well our model did."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import pyspark.ml.evaluation as ev\n",
    "\n",
    "evaluator = ev.BinaryClassificationEvaluator(\n",
    "    rawPredictionCol='probability', \n",
    "    labelCol='INFANT_ALIVE_AT_REPORT')\n",
    "\n",
    "print(evaluator.evaluate(test_model, \n",
    "     {evaluator.metricName: 'areaUnderROC'}))\n",
    "print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Saving the model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "PySpark allows you to save the `Pipeline` definition for later use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'\n",
    "pipeline.write().overwrite().save(pipelinePath)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "So, you can load it up later and use straight away to `.fit(...)` and predict."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "loadedPipeline = Pipeline.load(pipelinePath)\n",
    "loadedPipeline \\\n",
    "    .fit(births_train)\\\n",
    "    .transform(births_test)\\\n",
    "    .take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can also save the whole model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml import PipelineModel\n",
    "\n",
    "modelPath = './infant_oneHotEncoder_Logistic_PipelineModel'\n",
    "model.write().overwrite().save(modelPath)\n",
    "\n",
    "loadedPipelineModel = PipelineModel.load(modelPath)\n",
    "test_loadedModel = loadedPipelineModel.transform(births_test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parameter hyper-tuning"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Grid search"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Load the `.tuning` part of the package."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pyspark.ml.tuning as tune"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next let's specify our model and the list of parameters we want to loop through."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "logistic = cl.LogisticRegression(\n",
    "    labelCol='INFANT_ALIVE_AT_REPORT')\n",
    "\n",
    "grid = tune.ParamGridBuilder() \\\n",
    "    .addGrid(logistic.maxIter,  \n",
    "             [2, 10, 50]) \\\n",
    "    .addGrid(logistic.regParam, \n",
    "             [0.01, 0.05, 0.3]) \\\n",
    "    .build()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we need some way of comparing the models."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "evaluator = ev.BinaryClassificationEvaluator(\n",
    "    rawPredictionCol='probability', \n",
    "    labelCol='INFANT_ALIVE_AT_REPORT')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create the logic that will do the validation work for us."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cv = tune.CrossValidator(\n",
    "    estimator=logistic, \n",
    "    estimatorParamMaps=grid, \n",
    "    evaluator=evaluator\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a purely transforming `Pipeline`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "pipeline = Pipeline(stages=[encoder,featuresCreator])\n",
    "data_transformer = pipeline.fit(births_train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Having done this, we are ready to find the optimal combination of parameters for our model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "cvModel = cv.fit(data_transformer.transform(births_train))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `cvModel` will return the best model estimated. We can now use it to see if it performed better than our previous model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "data_train = data_transformer \\\n",
    "    .transform(births_test)\n",
    "results = cvModel.transform(data_train)\n",
    "\n",
    "print(evaluator.evaluate(results, \n",
    "     {evaluator.metricName: 'areaUnderROC'}))\n",
    "print(evaluator.evaluate(results, \n",
    "     {evaluator.metricName: 'areaUnderPR'}))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "What parameters has the best model? The answer is a little bit convoluted but here's how you can extract it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "results = [\n",
    "    (\n",
    "        [\n",
    "            {key.name: paramValue} \n",
    "            for key, paramValue \n",
    "            in zip(\n",
    "                params.keys(), \n",
    "                params.values())\n",
    "        ], metric\n",
    "    ) \n",
    "    for params, metric \n",
    "    in zip(\n",
    "        cvModel.getEstimatorParamMaps(), \n",
    "        cvModel.avgMetrics\n",
    "    )\n",
    "]\n",
    "\n",
    "sorted(results, \n",
    "       key=lambda el: el[1], \n",
    "       reverse=True)[0]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Train-Validation splitting"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use the `ChiSqSelector` to select only top 5 features, thus limiting the complexity of our model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "selector = ft.ChiSqSelector(\n",
    "    numTopFeatures=5, \n",
    "    featuresCol=featuresCreator.getOutputCol(), \n",
    "    outputCol='selectedFeatures',\n",
    "    labelCol='INFANT_ALIVE_AT_REPORT'\n",
    ")\n",
    "\n",
    "logistic = cl.LogisticRegression(\n",
    "    labelCol='INFANT_ALIVE_AT_REPORT',\n",
    "    featuresCol='selectedFeatures'\n",
    ")\n",
    "\n",
    "pipeline = Pipeline(stages=[encoder,featuresCreator,selector])\n",
    "data_transformer = pipeline.fit(births_train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `TrainValidationSplit` object gets created in the same fashion as the `CrossValidator` model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "tvs = tune.TrainValidationSplit(\n",
    "    estimator=logistic, \n",
    "    estimatorParamMaps=grid, \n",
    "    evaluator=evaluator\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As before, we fit our data to the model, and calculate the results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "tvsModel = tvs.fit(\n",
    "    data_transformer \\\n",
    "        .transform(births_train)\n",
    ")\n",
    "\n",
    "data_train = data_transformer \\\n",
    "    .transform(births_test)\n",
    "results = tvsModel.transform(data_train)\n",
    "\n",
    "print(evaluator.evaluate(results, \n",
    "     {evaluator.metricName: 'areaUnderROC'}))\n",
    "print(evaluator.evaluate(results, \n",
    "     {evaluator.metricName: 'areaUnderPR'}))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Other features of PySpark ML in action"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Feature extraction"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### NLP related feature extractors"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Simple dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "text_data = spark.createDataFrame([\n",
    "    ['''Machine learning can be applied to a wide variety \n",
    "        of data types, such as vectors, text, images, and \n",
    "        structured data. This API adopts the DataFrame from \n",
    "        Spark SQL in order to support a variety of data types.'''],\n",
    "    ['''DataFrame supports many basic and structured types; \n",
    "        see the Spark SQL datatype reference for a list of \n",
    "        supported types. In addition to the types listed in \n",
    "        the Spark SQL guide, DataFrame can use ML Vector types.'''],\n",
    "    ['''A DataFrame can be created either implicitly or \n",
    "        explicitly from a regular RDD. See the code examples \n",
    "        below and the Spark SQL programming guide for examples.'''],\n",
    "    ['''Columns in a DataFrame are named. The code examples \n",
    "        below use names such as \"text,\" \"features,\" and \"label.\"''']\n",
    "], ['input'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, we need to tokenize this text."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "tokenizer = ft.RegexTokenizer(\n",
    "    inputCol='input', \n",
    "    outputCol='input_arr', \n",
    "    pattern='\\s+|[,.\\\"]')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The output of the tokenizer looks similar to this."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "tok = tokenizer \\\n",
    "    .transform(text_data) \\\n",
    "    .select('input_arr') \n",
    "\n",
    "tok.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use the `StopWordsRemover(...)`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "stopwords = ft.StopWordsRemover(\n",
    "    inputCol=tokenizer.getOutputCol(), \n",
    "    outputCol='input_stop')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The output of the method looks as follows"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "stopwords.transform(tok).select('input_stop').take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Build `NGram` model and the `Pipeline`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "ngram = ft.NGram(n=2, \n",
    "    inputCol=stopwords.getOutputCol(), \n",
    "    outputCol=\"nGrams\")\n",
    "\n",
    "pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have the `pipeline` we follow in the very similar fashion as before."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "data_ngram = pipeline \\\n",
    "    .fit(text_data) \\\n",
    "    .transform(text_data)\n",
    "    \n",
    "data_ngram.select('nGrams').take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "That's it. We got our n-grams and we can then use them in further NLP processing."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Discretize continuous variables"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It is sometimes useful to *band* the values into discrete buckets."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "\n",
    "x = np.arange(0, 100)\n",
    "x = x / 100.0 * np.pi * 4\n",
    "y = x * np.sin(x / 1.764) + 20.1234\n",
    "\n",
    "schema = typ.StructType([\n",
    "    typ.StructField('continuous_var', \n",
    "                    typ.DoubleType(), \n",
    "                    False\n",
    "   )\n",
    "])\n",
    "\n",
    "data = spark.createDataFrame([[float(e), ] for e in y], schema=schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use the `QuantileDiscretizer` model to split our continuous variable into 5 buckets (see the `numBuckets` parameter)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "discretizer = ft.QuantileDiscretizer(\n",
    "    numBuckets=5, \n",
    "    inputCol='continuous_var', \n",
    "    outputCol='discretized')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's see what we got."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "data_discretized = discretizer.fit(data).transform(data)\n",
    "\n",
    "data_discretized \\\n",
    "    .groupby('discretized')\\\n",
    "    .mean('continuous_var')\\\n",
    "    .sort('discretized')\\\n",
    "    .collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Standardizing continuous variables\n",
    "\n",
    "Create a vector representation of our continuous variable (as it is only a single float)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "vectorizer = ft.VectorAssembler(\n",
    "    inputCols=['continuous_var'], \n",
    "    outputCol= 'continuous_vec')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Build a `normalizer` and a `pipeline`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "normalizer = ft.StandardScaler(\n",
    "    inputCol=vectorizer.getOutputCol(), \n",
    "    outputCol='normalized', \n",
    "    withMean=True,\n",
    "    withStd=True\n",
    ")\n",
    "\n",
    "pipeline = Pipeline(stages=[vectorizer, normalizer])\n",
    "data_standardized = pipeline.fit(data).transform(data)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Classification"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will now use the `RandomForestClassfier` to model the chances of survival for an infant.\n",
    "\n",
    "First, we need to cast the label feature to `DoubleType`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import pyspark.sql.functions as func\n",
    "\n",
    "births = births.withColumn(\n",
    "    'INFANT_ALIVE_AT_REPORT', \n",
    "    func.col('INFANT_ALIVE_AT_REPORT').cast(typ.DoubleType())\n",
    ")\n",
    "\n",
    "births_train, births_test = births \\\n",
    "    .randomSplit([0.7, 0.3], seed=666)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We are ready to build our model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "classifier = cl.RandomForestClassifier(\n",
    "    numTrees=5, \n",
    "    maxDepth=5, \n",
    "    labelCol='INFANT_ALIVE_AT_REPORT')\n",
    "\n",
    "pipeline = Pipeline(\n",
    "    stages=[\n",
    "        encoder,\n",
    "        featuresCreator, \n",
    "        classifier])\n",
    "\n",
    "model = pipeline.fit(births_train)\n",
    "test = model.transform(births_test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's now see how the `RandomForestClassifier` model performs compared to the `LogisticRegression`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "evaluator = ev.BinaryClassificationEvaluator(\n",
    "    labelCol='INFANT_ALIVE_AT_REPORT')\n",
    "print(evaluator.evaluate(test, \n",
    "    {evaluator.metricName: \"areaUnderROC\"}))\n",
    "print(evaluator.evaluate(test, \n",
    "    {evaluator.metricName: \"areaUnderPR\"}))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "Let's test how well would one tree do, then."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "classifier = cl.DecisionTreeClassifier(\n",
    "    maxDepth=5, \n",
    "    labelCol='INFANT_ALIVE_AT_REPORT')\n",
    "pipeline = Pipeline(stages=[\n",
    "        encoder,\n",
    "        featuresCreator, \n",
    "        classifier]\n",
    ")\n",
    "\n",
    "model = pipeline.fit(births_train)\n",
    "test = model.transform(births_test)\n",
    "\n",
    "evaluator = ev.BinaryClassificationEvaluator(\n",
    "    labelCol='INFANT_ALIVE_AT_REPORT')\n",
    "print(evaluator.evaluate(test, \n",
    "     {evaluator.metricName: \"areaUnderROC\"}))\n",
    "print(evaluator.evaluate(test, \n",
    "     {evaluator.metricName: \"areaUnderPR\"}))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Clustering"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this example we will use k-means model to find similarities in the births data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pyspark.ml.clustering as clus\n",
    "\n",
    "kmeans = clus.KMeans(k = 5, \n",
    "    featuresCol='features')\n",
    "\n",
    "pipeline = Pipeline(stages=[\n",
    "        encoder,\n",
    "        featuresCreator, \n",
    "        kmeans]\n",
    ")\n",
    "\n",
    "model = pipeline.fit(births_train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Having estimated the model, let's see if we can find some differences between clusters."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "test = model.transform(births_test)\n",
    "\n",
    "test \\\n",
    "    .groupBy('prediction') \\\n",
    "    .agg({\n",
    "        '*': 'count', \n",
    "        'MOTHER_HEIGHT_IN': 'avg'\n",
    "    }).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "In the field of NLP, problems such as topic extract rely on clustering to detect documents with similar topics. First, let's create our dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "text_data = spark.createDataFrame([\n",
    "    ['''To make a computer do anything, you have to write a \n",
    "    computer program. To write a computer program, you have \n",
    "    to tell the computer, step by step, exactly what you want \n",
    "    it to do. The computer then \"executes\" the program, \n",
    "    following each step mechanically, to accomplish the end \n",
    "    goal. When you are telling the computer what to do, you \n",
    "    also get to choose how it's going to do it. That's where \n",
    "    computer algorithms come in. The algorithm is the basic \n",
    "    technique used to get the job done. Let's follow an \n",
    "    example to help get an understanding of the algorithm \n",
    "    concept.'''],\n",
    "    ['''Laptop computers use batteries to run while not \n",
    "    connected to mains. When we overcharge or overheat \n",
    "    lithium ion batteries, the materials inside start to \n",
    "    break down and produce bubbles of oxygen, carbon dioxide, \n",
    "    and other gases. Pressure builds up, and the hot battery \n",
    "    swells from a rectangle into a pillow shape. Sometimes \n",
    "    the phone involved will operate afterwards. Other times \n",
    "    it will die. And occasionally—kapow! To see what's \n",
    "    happening inside the battery when it swells, the CLS team \n",
    "    used an x-ray technology called computed tomography.'''],\n",
    "    ['''This technology describes a technique where touch \n",
    "    sensors can be placed around any side of a device \n",
    "    allowing for new input sources. The patent also notes \n",
    "    that physical buttons (such as the volume controls) could \n",
    "    be replaced by these embedded touch sensors. In essence \n",
    "    Apple could drop the current buttons and move towards \n",
    "    touch-enabled areas on the device for the existing UI. It \n",
    "    could also open up areas for new UI paradigms, such as \n",
    "    using the back of the smartphone for quick scrolling or \n",
    "    page turning.'''],\n",
    "    ['''The National Park Service is a proud protector of \n",
    "    America’s lands. Preserving our land not only safeguards \n",
    "    the natural environment, but it also protects the \n",
    "    stories, cultures, and histories of our ancestors. As we \n",
    "    face the increasingly dire consequences of climate \n",
    "    change, it is imperative that we continue to expand \n",
    "    America’s protected lands under the oversight of the \n",
    "    National Park Service. Doing so combats climate change \n",
    "    and allows all American’s to visit, explore, and learn \n",
    "    from these treasured places for generations to come. It \n",
    "    is critical that President Obama acts swiftly to preserve \n",
    "    land that is at risk of external threats before the end \n",
    "    of his term as it has become blatantly clear that the \n",
    "    next administration will not hold the same value for our \n",
    "    environment over the next four years.'''],\n",
    "    ['''The National Park Foundation, the official charitable \n",
    "    partner of the National Park Service, enriches America’s \n",
    "    national parks and programs through the support of \n",
    "    private citizens, park lovers, stewards of nature, \n",
    "    history enthusiasts, and wilderness adventurers. \n",
    "    Chartered by Congress in 1967, the Foundation grew out of \n",
    "    a legacy of park protection that began over a century \n",
    "    ago, when ordinary citizens took action to establish and \n",
    "    protect our national parks. Today, the National Park \n",
    "    Foundation carries on the tradition of early park \n",
    "    advocates, big thinkers, doers and dreamers—from John \n",
    "    Muir and Ansel Adams to President Theodore Roosevelt.'''],\n",
    "    ['''Australia has over 500 national parks. Over 28 \n",
    "    million hectares of land is designated as national \n",
    "    parkland, accounting for almost four per cent of \n",
    "    Australia's land areas. In addition, a further six per \n",
    "    cent of Australia is protected and includes state \n",
    "    forests, nature parks and conservation reserves.National \n",
    "    parks are usually large areas of land that are protected \n",
    "    because they have unspoilt landscapes and a diverse \n",
    "    number of native plants and animals. This means that \n",
    "    commercial activities such as farming are prohibited and \n",
    "    human activity is strictly monitored.''']\n",
    "], ['documents'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, we will once again use the `RegexTokenizer` and the `StopWordsRemover` models."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "tokenizer = ft.RegexTokenizer(\n",
    "    inputCol='documents', \n",
    "    outputCol='input_arr', \n",
    "    pattern='\\s+|[,.\\\"]')\n",
    "\n",
    "stopwords = ft.StopWordsRemover(\n",
    "    inputCol=tokenizer.getOutputCol(), \n",
    "    outputCol='input_stop')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next in our pipeline is the `CountVectorizer`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "stringIndexer = ft.CountVectorizer(\n",
    "    inputCol=stopwords.getOutputCol(), \n",
    "    outputCol=\"input_indexed\")\n",
    "\n",
    "tokenized = stopwords \\\n",
    "    .transform(\n",
    "        tokenizer\\\n",
    "            .transform(text_data)\n",
    "    )\n",
    "    \n",
    "stringIndexer \\\n",
    "    .fit(tokenized)\\\n",
    "    .transform(tokenized)\\\n",
    "    .select('input_indexed')\\\n",
    "    .take(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will use the `LDA` model - the Latent Dirichlet Allocation model - to extract the topics."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "clustering = clus.LDA(k=2, optimizer='online', featuresCol=stringIndexer.getOutputCol())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Put these puzzles together."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "pipeline = Pipeline(stages=[\n",
    "        tokenizer, \n",
    "        stopwords,\n",
    "        stringIndexer, \n",
    "        clustering]\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's see if we have properly uncovered the topics."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "topics = pipeline \\\n",
    "    .fit(text_data) \\\n",
    "    .transform(text_data)\n",
    "\n",
    "topics.select('topicDistribution').collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Regression"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this section we will try to predict the `MOTHER_WEIGHT_GAIN`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN',\n",
    "            'MOTHER_PRE_WEIGHT','DIABETES_PRE',\n",
    "            'DIABETES_GEST','HYP_TENS_PRE', \n",
    "            'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM',\n",
    "            'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI', \n",
    "            'CIG_3_TRI'\n",
    "           ]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, we will collate all the features together and use the `ChiSqSelector` to select only the top 6 most important features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "featuresCreator = ft.VectorAssembler(\n",
    "    inputCols=[col for col in features[1:]], \n",
    "    outputCol='features'\n",
    ")\n",
    "\n",
    "selector = ft.ChiSqSelector(\n",
    "    numTopFeatures=6, \n",
    "    outputCol=\"selectedFeatures\", \n",
    "    labelCol='MOTHER_WEIGHT_GAIN'\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In order to predict the weight gain we will use the gradient boosted trees regressor."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pyspark.ml.regression as reg\n",
    "\n",
    "regressor = reg.GBTRegressor(\n",
    "    maxIter=15, \n",
    "    maxDepth=3,\n",
    "    labelCol='MOTHER_WEIGHT_GAIN')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, again, we put it all together into a `Pipeline`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "pipeline = Pipeline(stages=[\n",
    "        featuresCreator, \n",
    "        selector,\n",
    "        regressor])\n",
    "\n",
    "weightGain = pipeline.fit(births_train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Having created the `weightGain` model, let's see if it performs well on our testing data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "evaluator = ev.RegressionEvaluator(\n",
    "    predictionCol=\"prediction\", \n",
    "    labelCol='MOTHER_WEIGHT_GAIN')\n",
    "\n",
    "print(evaluator.evaluate(\n",
    "     weightGain.transform(births_test), \n",
    "    {evaluator.metricName: 'r2'}))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
